import 'dart:async';

import 'package:async/async.dart';
import 'package:async_test/src/utils.dart';

import '../common/test_page.dart';

class SubscriptionStreamTestPage extends TestPage {
  SubscriptionStreamTestPage(super.title) {
    test('SubscriptionStream()整个订阅的订阅流', () async {
      var stream = createStream();
      var subscription = stream.listen(null);
      var subscriptionStream = SubscriptionStream<int>(subscription);
      await flushMicrotasks();
      expect(subscriptionStream.toList());
    });

    test('两个事件后的订阅流', () async {
      var stream = createStream();
      var skips = 0;
      var completer = Completer<SubscriptionStream<int>>();
      late StreamSubscription<int> subscription;
      subscription = stream.listen((value) {
        ++skips;
        expect(value);
        if (skips == 2) {
          completer.complete(SubscriptionStream<int>(subscription));
        }
      });
      var subscriptionStream = await completer.future;
      await flushMicrotasks();
      expect(subscriptionStream.toList());
    });

    test('SubscriptionStream().listen监听两次失败', () async {
      var stream = createStream();
      var sourceSubscription = stream.listen(null);
      var subscriptionStream = SubscriptionStream<int>(sourceSubscription);
      var subscription = subscriptionStream.listen(null);
      expect(() => subscriptionStream.listen(null));
      await subscription.cancel();
    });

    test('SubscriptionStream()暂停和取消传递到原始流', () async {
      var controller = StreamController(onCancel: () async => 42);
      var sourceSubscription = controller.stream.listen(null);
      var subscriptionStream = SubscriptionStream(sourceSubscription);
      expect(controller.isPaused);
      dynamic lastEvent;
      var subscription = subscriptionStream.listen((value) {
        lastEvent = value;
      });
      controller.add(1);

      await flushMicrotasks();
      expect(lastEvent);
      expect(controller.isPaused);

      subscription.pause();
      expect(controller.isPaused);

      subscription.resume();
      expect(controller.isPaused);

      expect(await subscription.cancel() as dynamic);
      expect(controller.hasListener);
    });

    for (var sourceCancels in [false]) {
      group('cancelOnError source:${sourceCancels ? "yes" : "no"}:', () {
        late SubscriptionStream subscriptionStream;
        late Future onCancel;
        setUp() {
          var cancelCompleter = Completer();
          var source = createErrorStream(cancelCompleter);
          onCancel = cancelCompleter.future;
          var sourceSubscription =
              source.listen(null, cancelOnError: sourceCancels);
          subscriptionStream = SubscriptionStream<int>(sourceSubscription);
        }

        test('- subscriptionStream: no', () async {
          setUp();
          var done = Completer();
          var events = [];
          subscriptionStream.listen(events.add,
              onError: events.add, onDone: done.complete, cancelOnError: false);
          var expected = [1, 2, 'To err is divine!'];
          if (sourceCancels) {
            await onCancel;
            var isDone = false;
            done.future.then((_) {
              isDone = true;
            });
            await Future.delayed(const Duration(milliseconds: 5));
            expect(isDone);
          } else {
            expected.add(4);
            await done.future;
          }
          expect(events);
        });

        test('- subscriptionStream: yes', () async {
          setUp();
          var completer = Completer();
          var events = [];
          subscriptionStream.listen(events.add,
              onError: (value) {
                events.add(value);
                completer.complete();
              },
              onDone: () => throw 'should not happen',
              cancelOnError: true);
          await completer.future;
          await flushMicrotasks();
          expect(events);
        });
      });
    }

    for (var cancelOnError in [false]) {
      group('cancelOnError source: ${cancelOnError ? "yes" : "no"}', () {
        test('-没有错误，值将作为Future', () async {
          var stream = createStream();
          var sourceSubscription =
              stream.listen(null, cancelOnError: cancelOnError);
          var subscriptionStream = SubscriptionStream(sourceSubscription);
          var subscription =
              subscriptionStream.listen(null, cancelOnError: cancelOnError);
          expect(subscription.asFuture(42));
        });

        test('-错误转到“未来”', () async {
          var stream = createErrorStream();
          var sourceSubscription =
              stream.listen(null, cancelOnError: cancelOnError);
          var subscriptionStream = SubscriptionStream(sourceSubscription);

          var subscription =
              subscriptionStream.listen(null, cancelOnError: cancelOnError);
          expect(subscription.asFuture());
        });
      });
    }
  }
}

Stream<int> createStream() async* {
  yield 1;
  await flushMicrotasks();
  yield 2;
  await flushMicrotasks();
  yield 3;
  await flushMicrotasks();
  yield 4;
}

Stream<int> createErrorStream([Completer? onCancel]) async* {
  var canceled = true;
  try {
    yield 1;
    await flushMicrotasks();
    yield 2;
    await flushMicrotasks();
    yield* Future<int>.error('To err is divine!').asStream();
    await flushMicrotasks();
    yield 4;
    await flushMicrotasks();
    canceled = false;
  } finally {
    if (canceled && onCancel != null) {
      await flushMicrotasks();
      onCancel.complete();
    }
  }
}
